<?php
/**
 * Created by PhpStorm.
 * User: 11893
 * Date: 2019/1/14
 * Time: 16:44
 */
require_once 'RedisPool.php';

class Server
{
    protected $srv;
    protected $workers = [];

    public function __construct()
    {
        $serv = new swoole_websocket_server('0.0.0.0', 9501);
        $table = new swoole_table(1024);
        $table->column('fd', swoole_table::TYPE_INT);
        $table->column('from_id', swoole_table::TYPE_INT);
        $table->column('id', swoole_table::TYPE_INT);
        $table->create();
        $serv->table = $table;
        $serv->pool = new RedisPool();
        $serv->set(array(
            'worker_num' => 2,
            'task_worker_num' => 4,
        ));
        $process = $this->createProcess($serv);
        $serv->addProcess($process);
        $this->srv = $serv;
        $this->listen();
        $this->srv->start();
    }

    protected function listen()
    {
        $this->srv->on('start', function () {
            echo "Client:start.\n";
        });
        $this->srv->on('open', function (Swoole\WebSocket\Server $server, $request) {
            echo "server: handshake success with fd{$request->fd}\n";
        });
        $this->srv->on('task', function (swoole_server $serv, $task_id, $from_id, $data) {
            foreach ($serv->table as $value) {
                $serv->push($value['fd'], json_encode($data));
            }
            $serv->finish($data);
        });
        $this->srv->on('finish', function (swoole_server $serv, $task_id, $data) {
            $redis = $serv->pool->get();
            if ($redis) {
                $redis->lPush('swoole', json_encode($data));
                $serv->pool->put($redis);
            }
        });
        $this->srv->on('message', function (swoole_websocket_server $serv, $frame) {
            $fd = $frame->fd;
            $data = $frame->data;
            $json = json_decode($data, true);
            if ($json['type'] == 'login') {
                $serv->table->set('user_' . $fd, array('fd' => $fd, 'from_id' => 0, 'id' => $json['user']['name']));
                $json['user']['id'] = uniqid('swoole');
                $serv->push($fd, json_encode($json));
            } else {
                $task_id = $serv->task($json, 0);
            }
        });
        $this->srv->on('close', function ($serv, $fd) {
            echo "Client: Close.\n";
            $serv->table->del('user_' . $fd);
        });
    }

    //开进程做队列
    protected function createProcess($serv)
    {
        $process = new swoole_process(function (swoole_process $process) use ($serv) {
            $this->checkMPid($process);
            $redis = $serv->pool->get();
            try {
                $mysql = mysqli_connect('127.0.0.1', 'homestead', '123456', 'chat');
                $success = $mysql->select_db('chat');
            } catch (\Exception $e) {
                echo "MYSQL Connect Error:". $e->getMessage();
                $success = false;
                swoole_process::kill($process->pid);
            }
            while (true && $success) {
                if (!$redis) {
                    sleep(1);
                    $redis = $serv->pool->get();
                    continue;
                }
                if (!($message = $redis->brPop('swoole', 5))) {
                    sleep(1);
                    continue;
                }
                if ($mysql->query("INSERT INTO msg (`room`, `msg`) VALUES ('{$message[0]}', '{$message[1]}')")) {
                    echo "success" . PHP_EOL;
                } else {
                    echo $mysql->error . PHP_EOL;
                }
            }
        });
        swoole_process::wait();
        return $process;
    }

    protected function checkMPid(swoole_process $process)
    {
        if (!swoole_process::kill(posix_getpid(), 0)) {
            $msg = "master process exited, worker {$process->pid} also quit\n"; // 需要写入到日志中
            file_put_contents('process.log', $msg, FILE_APPEND);
            $process->exit(1);
        }
    }
}

new Server();
